import json
from kafka import KafkaConsumer
import redis

KAFKA_BOOTSTRAP_SERVERS = '192.168.88.130:9092'
KAFKA_TOPIC = 'agri_price_data'

REDIS_HOST = '192.168.88.130'
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_PASSWORD = '123456'

consumer = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',                # ✅ 关键点：从头消费
    enable_auto_commit=False,                    # 可手动提交 offset
    group_id='redis-consumer-group'              # ✅ 使用固定消费组
)

r = redis.Redis(
    host=REDIS_HOST,
    port=REDIS_PORT,
    db=REDIS_DB,
    password=REDIS_PASSWORD
)

print('开始消费Kafka数据并写入Redis...')
for msg in consumer:
    data = msg.value
    key = f"{data.get('name','unknown')}:{data.get('date','unknown')}:{data.get('location','unknown')}"
    r.set(key, json.dumps(data, ensure_ascii=False))
    print(f"写入Redis: {key} -> {data}")
